package io.openmessaging.demo;

import io.openmessaging.BytesMessage;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Author :  Rocky
 * Date : 20/05/2017 14:01
 * Description :
 * Test :
 */
public class StoreUtils {

    private static final int MSG_MAX_SZIE = 256 * 1024;

    public static AtomicLong wroteBytes = new AtomicLong(0);

    //filebasename + mbb
    private static Map<String, MappedByteBuffer> mbbBuffer = new ConcurrentHashMap<>();
    private static Map<String, AtomicInteger> fileIndex = new ConcurrentHashMap<>();
    private static final int FILE_MAX_SIZE = 100 * 1024 * 1024;

    public static void writeMessage(String storeDirPath, String fileBaseName, BytesMessage message) throws IOException {
        fileBaseName = ("msg." + fileBaseName).toLowerCase();
        File file = new File(storeDirPath, fileBaseName);
        String filePath = file.getPath();

        MappedByteBuffer mbb = mbbBuffer.get(filePath);
        if (mbb == null) {
            RandomAccessFile raf = new RandomAccessFile(filePath + "-0", "rw");
            MappedByteBuffer newMbb = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, FILE_MAX_SIZE);
            MappedByteBuffer oldMbb = mbbBuffer.putIfAbsent(filePath, newMbb);
            if (oldMbb != null) {
                mbb = oldMbb;
            } else {
                mbb = newMbb;
            }

            if (mbb.position() == 0) {
                synchronized (mbb) {
                    if (mbb.position() == 0) {
                        mbb.putInt(4);
                    }
                }
            }
            AtomicInteger fi = new AtomicInteger(0);
            fileIndex.putIfAbsent(filePath, fi);
        }

        synchronized (mbb) {
            while (!MessageUtils2.writeMessage(message, mbb)) {
                int fi = fileIndex.get(filePath).incrementAndGet();
                String fileName = filePath + "-" + fi;
                RandomAccessFile raf = new RandomAccessFile(fileName, "rw");
                mbb = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, FILE_MAX_SIZE);
                raf.close();
                mbb.putInt(0);
                mbbBuffer.put(filePath, mbb);
                System.out.println("进行文件扩展,新的文件名 ： " + fileName);
            }
            mbb.putInt(0, mbb.position());
        }
    }


    private static Map<Object, Iterator<File>> waitReadFileNames = new ConcurrentHashMap<>();
    private static Map<Object, MappedByteBuffer> currentPostion = new ConcurrentHashMap<>();

    /**
     * @param customerId    要求能唯一区分消费者
     * @param fileBaseNames
     */
    public static BytesMessage readMessage(Object customerId, String storeDirPath, List<String> fileBaseNames) throws IOException {
        //确定要读取哪些文件
        Iterator<File> waitReadFileName = waitReadFileNames.get(customerId);
        if (waitReadFileName == null) {
            List<String> tmpFileBaseName = new ArrayList<>(fileBaseNames.size());
            for (String fileBaseName : fileBaseNames) {
                tmpFileBaseName.add(("msg." + fileBaseName + "-").toLowerCase());
            }
            fileBaseNames = tmpFileBaseName;

            File storeDir = new File(storeDirPath);
            ArrayList<File> fileList = new ArrayList<>();
            for (String fileBaseName : fileBaseNames) {
                List<File> files = listFile(storeDir, fileBaseName);
                fileList.addAll(files);
            }
            waitReadFileName = fileList.iterator();
            waitReadFileNames.putIfAbsent(customerId, waitReadFileName);
        }

        MappedByteBuffer mbb = currentPostion.get(customerId);

        //获取当前正在读取的文件
        if (mbb == null) {
            //如果还有文件未读，则初始化相关信息
            if (waitReadFileName.hasNext()) {
                File file = waitReadFileName.next();
//                File file = new File(storeDirPath, fileName);
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                int endPosition = raf.readInt();
                mbb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 4, endPosition - 4);
                raf.close();
                currentPostion.put(customerId, mbb);
            } else {
                return null;
            }
        }

        //判断当前文件已经读完
        if (!mbb.hasRemaining()) {

            //如果是最后一个文件
            if (!waitReadFileName.hasNext()) {
                return null;
            }

            //获取下一个要读取的文件的相关信息
            File file = waitReadFileName.next();
//            File file = new File(storeDirPath, fileName);
            RandomAccessFile raf = new RandomAccessFile(file, "r");
            int endPosition = raf.readInt();
            mbb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 4, endPosition - 4);
            raf.close();
            currentPostion.put(customerId, mbb);
        }

        //读文件
        BytesMessage message = MessageUtils2.readMessage(mbb);
        return message;
    }

    private static List<File> listFile(File baseDir, String fileBaseName) {
        String[] fileNames = baseDir.list(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                if (dir != baseDir) {
                    return false;
                }
                if (name.toLowerCase().startsWith(fileBaseName)) {
                    return true;
                }

                return false;
            }
        });

        List<File> result = new ArrayList<>(fileNames.length);
        for (String fileName : fileNames) {
            result.add(new File(baseDir, fileName));
        }
        Collections.sort(result, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                return Long.valueOf(o1.lastModified()).compareTo(o2.lastModified());
            }
        });
        return result;
    }

    public static void flush() {
        /*long wroteSize = 0;
        for (Map.Entry<String, MappedByteBuffer> entry : mbbBuffer.entrySet()) {
            wroteSize += entry.getValue().position();
        }
        System.out.println("总共写入了 " + wroteSize + " byte数据");*/
    }


}


